package com.air.antispider.stream.rulecompute.businessprocess

import com.air.antispider.stream.commom.bean._
import com.air.antispider.stream.dataprocess.constants.{BehaviorTypeEnum, FlightTypeEnum, TravelTypeEnum}
import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.streaming.dstream.DStream

object QueryDataPackage {
  //将kafka中的数据封装为对象
  def queryDataLoadAndPackage(dataStream: DStream[String]): DStream[ProcessedData] = {
    dataStream.mapPartitions(partitionIterator =>{
      //创建 json 解析
      val mapper = new ObjectMapper
      mapper.registerModule(DefaultScalaModule)
      partitionIterator.map(sourceLine =>{
        //分割数据
        val dataArray = sourceLine.split("#CS#", -1)
        //原始数据，站位，并无数据
        val sourceData = dataArray(0)
        val requestMethod = dataArray(1)
        val request = dataArray(2)
        val remoteAddr = dataArray(3)
        val httpUserAgent = dataArray(4)
        val timeIso8601 = dataArray(5)
        val serverAddr = dataArray(6)
        val highFrqIPGroup: Boolean = dataArray(7).equalsIgnoreCase("true")
        val requestType: RequestType = RequestType(FlightTypeEnum.withName(dataArray(8)), BehaviorTypeEnum.withName(dataArray(9)))
        val travelType: TravelTypeEnum = TravelTypeEnum.withName(dataArray(10))
        val requestParams: CoreRequestParams = CoreRequestParams(dataArray(11), dataArray(12), dataArray(13))
        val cookieValue_JSESSIONID: String = dataArray(14)
        val cookieValue_USERID: String = dataArray(15)
        //分析查询请求的时候不需要 book 数据
        val bookRequestData: Option[BookRequestData] = None
        //封装 query 数据
        val queryRequestData = if (!dataArray(16).equalsIgnoreCase("NULL")) {
          mapper.readValue(dataArray(16), classOf[QueryRequestData]) match {
            case value if value != null => Some(value)
            case _ => None
          }
        } else {
          None
        }
        val httpReferrer = dataArray(18)
        //封装流程数据，返回
        ProcessedData("", requestMethod, request, remoteAddr, httpUserAgent, timeIso8601, serverAddr, highFrqIPGroup, requestType, travelType, requestParams, cookieValue_JSESSIONID, cookieValue_USERID, queryRequestData, bookRequestData, httpReferrer)

      })
    })


  }


}
